Skip to main content

SciPy for ML

A senior ML engineer inherits this hyperparameter tuning script:

# The existing "tuning" script
import numpy as np

best_loss = float("inf")
best_alpha = 0.01

for alpha in [0.0001, 0.001, 0.01, 0.1, 1.0, 10.0]:
for n_estimators in [50, 100, 200, 500]:
loss = evaluate_model(alpha=alpha, n_estimators=n_estimators)
if loss < best_loss:
best_loss = loss
best_alpha = alpha

print(f"Best alpha: {best_alpha}, loss: {best_loss}")

This grid search evaluates 24 combinations. The search space has two parameters, each with 4-6 discrete values. But the real hyperparameter space is continuous - alpha lives on [1e-5, 100]. Grid search misses the optimal point by construction, and it scales exponentially with the number of parameters.

There is a better way, and it has been in SciPy for twenty years:

from scipy.optimize import differential_evolution

def objective(params):
alpha, log_n = params
n_estimators = int(10 ** log_n) # search on log scale
return evaluate_model(alpha=alpha, n_estimators=n_estimators)

result = differential_evolution(
objective,
bounds=[(1e-5, 100), (1.5, 3.0)], # log10(n): 31 to 1000
maxiter=50,
seed=42,
workers=-1, # parallel evaluation
)

print(f"Best params: alpha={result.x[0]:.4f}, n={int(10**result.x[1])}")
print(f"Best loss: {result.fun:.4f}")

Twenty iterations of differential evolution with parallel evaluation will outperform a grid search over 24 points on almost every real optimisation landscape.

SciPy is the library that fills the gap between "raw NumPy array arithmetic" and "full ML framework". It contains the algorithms that ML frameworks rely on internally but expose poorly or not at all: gradient-free optimisation, sparse matrix formats, statistical tests, pairwise distance computation, and signal processing.

This lesson teaches you which SciPy tools belong in every ML engineer's toolkit and precisely when to reach for each one.

What You Will Learn

  • scipy.optimize: gradient-free optimisation (minimize, differential_evolution, curve fitting) for hyperparameter search, model calibration, and custom loss minimisation
  • Sparse matrices: CSR and CSC formats, when to use them, NLP and recommender system patterns
  • scipy.stats: hypothesis testing for A/B decisions, distribution fitting, KS tests for data drift detection, confidence intervals
  • scipy.spatial.distance: pairwise distance computation (pdist, cdist) for KNN, clustering, and embedding similarity
  • scipy.signal: filtering and spectral features for time series ML
  • scipy.linalg vs numpy.linalg: when the difference matters

Prerequisites

  • NumPy Internals (Lesson 01), Pandas for ML (Lesson 02)
  • Basic calculus: what a gradient is, what a minimum is
  • Basic statistics: mean, variance, what a p-value means

Part 1 - scipy.optimize: Optimisation Without Gradients

PyTorch and TensorFlow give you gradient-based optimisation. But many ML problems do not expose gradients: black-box model hyperparameters, constraint satisfaction, curve fitting to empirical data. This is where scipy.optimize lives.

minimize() for General Optimisation

from scipy.optimize import minimize
import numpy as np

# ------------------------------------------------------------------ #
# Use case 1: Calibrate a model's output probabilities (Platt scaling)
# ------------------------------------------------------------------ #
# After training a classifier, the raw scores may not be well-calibrated.
# Platt scaling fits a logistic function to map raw scores to calibrated probs.
# The calibration is a 2-parameter optimisation: find (a, b) that minimise
# log-loss on a held-out calibration set.

def platt_log_loss(params, raw_scores, true_labels):
"""Negative log-likelihood under logistic calibration."""
a, b = params
# Calibrated probabilities
probs = 1.0 / (1.0 + np.exp(-(a * raw_scores + b)))
probs = np.clip(probs, 1e-9, 1 - 1e-9)
return -np.mean(
true_labels * np.log(probs) + (1 - true_labels) * np.log(1 - probs)
)

# Simulate uncalibrated model output
np.random.seed(42)
n_cal = 5_000
raw_scores = np.random.randn(n_cal) # model's raw logits
true_labels = (raw_scores + np.random.randn(n_cal) * 0.8 > 0).astype(float)

result = minimize(
platt_log_loss,
x0=[1.0, 0.0], # initial guess: identity (no calibration)
args=(raw_scores, true_labels),
method="L-BFGS-B", # quasi-Newton, handles smooth objectives well
bounds=[(-10, 10), (-10, 10)],
options={"maxiter": 500, "ftol": 1e-10},
)

a_opt, b_opt = result.x
print(f"Optimal Platt params: a={a_opt:.4f}, b={b_opt:.4f}")
print(f"Log-loss: {result.fun:.4f}, Success: {result.success}")

def calibrated_prob(raw_score, a, b):
return 1.0 / (1.0 + np.exp(-(a * raw_score + b)))

probs = calibrated_prob(raw_scores, a_opt, b_opt)
print(f"Calibrated prob range: [{probs.min():.3f}, {probs.max():.3f}]")

Available Methods and When to Use Each

MethodUse When
L-BFGS-BSmooth objective, can provide gradient or approximated via finite differences, box constraints
SLSQPSmooth objective with inequality and equality constraints
Nelder-MeadNon-smooth objective, no constraints, low-dimensional
PowellNo gradient, moderate dimension, no constraints
trust-constrNon-linear constraints, robust to non-convex problems
from scipy.optimize import minimize
import numpy as np

# ------------------------------------------------------------------ #
# Use case 2: Optimise a metric that is not differentiable
# (e.g., F1 score as a function of a threshold)
# ------------------------------------------------------------------ #
from sklearn.metrics import f1_score

np.random.seed(99)
n = 10_000
y_true = np.random.randint(0, 2, n)
y_score = np.clip(y_true * 0.7 + np.random.randn(n) * 0.3, 0, 1)

def neg_f1(threshold_arr):
"""Negate F1 so we can minimise it."""
threshold = threshold_arr[0]
y_pred = (y_score >= threshold).astype(int)
return -f1_score(y_true, y_pred)

result = minimize(
neg_f1,
x0=[0.5],
method="Nelder-Mead", # F1 is non-smooth - gradient methods fail here
bounds=[(0.0, 1.0)],
options={"xatol": 1e-4, "fatol": 1e-4, "maxiter": 200},
)

best_threshold = result.x[0]
print(f"Optimal threshold: {best_threshold:.4f}")
print(f"Best F1: {-result.fun:.4f}")

differential_evolution for Global Optimisation

Standard minimize finds local minima. When the objective landscape has multiple local optima (common in ML), use differential_evolution - a population-based global optimiser.

from scipy.optimize import differential_evolution
import numpy as np

# ------------------------------------------------------------------ #
# Use case: Hyperparameter tuning with black-box evaluation
# ------------------------------------------------------------------ #
# We treat the ML model as a black box function: params → validation loss.
# Differential evolution explores the space globally.

def evaluate_model(alpha: float, gamma: float, n_estimators: int) -> float:
"""Simulated model evaluation - in practice this trains and evaluates a model."""
# Simulate a multi-modal loss surface with a global minimum
loss = (
(np.log10(alpha) + 2) ** 2 # optimal alpha ~ 0.01
+ (np.log10(gamma) + 1) ** 2 # optimal gamma ~ 0.1
+ 0.5 * (n_estimators / 100 - 1) ** 2 # optimal n_estimators ~ 100
+ np.random.randn() * 0.05 # noise
)
return float(loss)

def objective(params):
# Differential evolution passes a 1D array of parameter values
log_alpha, log_gamma, log_n = params
alpha = 10 ** log_alpha
gamma = 10 ** log_gamma
n_estimators = int(10 ** log_n)
return evaluate_model(alpha, gamma, n_estimators)

bounds = [
(-5, 0), # log10(alpha): 1e-5 to 1
(-3, 1), # log10(gamma): 1e-3 to 10
(1.0, 3.0), # log10(n_estimators): 10 to 1000
]

result = differential_evolution(
objective,
bounds=bounds,
maxiter=100, # max generations
popsize=15, # population size = 15 * n_params = 45 candidates
mutation=(0.5, 1.5), # mutation factor range
recombination=0.7, # crossover probability
seed=42,
tol=1e-6,
workers=1, # set to -1 for parallel (uses all CPU cores)
disp=False,
)

log_alpha, log_gamma, log_n = result.x
print(f"Best alpha: {10**log_alpha:.5f}")
print(f"Best gamma: {10**log_gamma:.5f}")
print(f"Best n_estimators: {int(10**log_n)}")
print(f"Best loss: {result.fun:.4f}")
print(f"Converged: {result.success}")

curve_fit for Model Calibration

from scipy.optimize import curve_fit
import numpy as np

# ------------------------------------------------------------------ #
# Use case: Fit a learning curve to predict training time / accuracy
# ------------------------------------------------------------------ #

# Empirically observed: training accuracy as a function of dataset size
# follows a power law: acc(n) = 1 - a * n^(-b)
def power_law(n, a, b):
return 1.0 - a * n ** (-b)

# Observed data points (train size → validation accuracy)
train_sizes = np.array([100, 250, 500, 1000, 2500, 5000, 10000], dtype=float)
val_accs = np.array([0.71, 0.76, 0.80, 0.83, 0.87, 0.89, 0.91])

# Add noise to simulate realistic measurement
np.random.seed(7)
val_accs_noisy = val_accs + np.random.randn(len(val_accs)) * 0.005

# Fit the power law
popt, pcov = curve_fit(
power_law,
train_sizes,
val_accs_noisy,
p0=[1.0, 0.5], # initial guess for (a, b)
bounds=([0, 0], [10, 2]),
maxfev=10_000,
)

a_fit, b_fit = popt
a_std, b_std = np.sqrt(np.diag(pcov)) # parameter standard errors

print(f"Fitted: a = {a_fit:.4f} ± {a_std:.4f}")
print(f"Fitted: b = {b_fit:.4f} ± {b_std:.4f}")

# Predict: how much data do we need to reach 95% accuracy?
# 0.95 = 1 - a * n^(-b) => n = (a / 0.05)^(1/b)
n_needed = (a_fit / 0.05) ** (1.0 / b_fit)
print(f"Estimated data needed for 95% acc: {n_needed:,.0f} samples")

# Predict accuracy at 100K samples
predicted_100k = power_law(100_000, a_fit, b_fit)
print(f"Predicted accuracy at 100K: {predicted_100k:.3f}")

Part 2 - Sparse Matrices

Sparse matrices store only non-zero elements. When data is very sparse (>90% zeros), sparse matrices use orders of magnitude less memory and run faster.

When to Use Sparse Matrices

  • NLP: TF-IDF or bag-of-words matrices - a 100K document, 200K vocabulary matrix is 100K * 200K = 20 billion elements, but each document uses ~200 unique words. Sparsity: 99.9%.
  • Recommender systems: user-item interaction matrices - 10M users, 1M items, but average user interacts with ~50 items. Sparsity: 99.995%.
  • Graph ML: adjacency matrices - most nodes are not connected to most other nodes.

CSR vs CSC

from scipy.sparse import csr_matrix, csc_matrix, coo_matrix
import numpy as np

# ------------------------------------------------------------------ #
# CSR (Compressed Sparse Row): efficient for row slicing and matrix-vector products
# CSC (Compressed Sparse Column): efficient for column slicing
# COO (Coordinate): efficient for incremental construction
# ------------------------------------------------------------------ #

# Build a 1000 x 5000 sparse matrix with 1% nonzero
np.random.seed(42)
n_rows, n_cols = 1_000, 5_000
density = 0.01
n_nonzero = int(n_rows * n_cols * density)

rows = np.random.randint(0, n_rows, n_nonzero)
cols = np.random.randint(0, n_cols, n_nonzero)
data = np.random.randn(n_nonzero).astype(np.float32)

# Build as COO (natural for (row, col, value) triples), then convert
coo = coo_matrix((data, (rows, cols)), shape=(n_rows, n_cols))
csr = coo.tocsr() # convert to CSR for computation
csc = coo.tocsc() # convert to CSC for column operations

print(f"Dense equivalent: {n_rows * n_cols * 4 / 1024**2:.1f} MB")
print(f"CSR memory: {(csr.data.nbytes + csr.indices.nbytes + csr.indptr.nbytes) / 1024:.1f} KB")
# Dense: 19.1 MB, CSR: ~240 KB - 80x reduction

# CSR internal structure:
# - data: non-zero values, length = n_nonzero
# - indices: column index for each value, length = n_nonzero
# - indptr: where each row starts in data/indices, length = n_rows + 1
print(f"data: {csr.data.shape}") # (n_nonzero,)
print(f"indices: {csr.indices.shape}") # (n_nonzero,)
print(f"indptr: {csr.indptr.shape}") # (n_rows + 1,)

Sparse Matrix Operations

from scipy.sparse import csr_matrix
import numpy as np
import scipy.sparse as sp

# ------------------------------------------------------------------ #
# NLP: TF-IDF vectorisation and cosine similarity
# ------------------------------------------------------------------ #
from sklearn.feature_extraction.text import TfidfVectorizer

documents = [
"the cat sat on the mat",
"the dog sat on the mat",
"the cat chased the dog",
"deep learning is a type of machine learning",
"machine learning uses neural networks",
"neural networks are used in deep learning",
] * 1_000 # 6K documents

vectoriser = TfidfVectorizer(max_features=10_000, min_df=2)
X = vectoriser.fit_transform(documents) # Returns CSR sparse matrix

print(f"Shape: {X.shape}") # (6000, ~100) - only ~100 unique terms
print(f"Type: {type(X)}")
print(f"Sparsity: {1 - X.nnz / (X.shape[0] * X.shape[1]):.1%}")

# Dense matrix-vector product: O(n_docs * vocab_size)
# Sparse matrix-vector product: O(n_docs * avg_terms_per_doc)
# For TF-IDF, avg terms per doc << vocab_size → sparse is much faster

# Cosine similarity between first document and all others
# (CSR format is optimal for this)
import time

query = X[0] # (1, n_features) sparse row

t0 = time.perf_counter()
# Sparse dot product: query (1, n_features) @ X.T (n_features, n_docs) → (1, n_docs)
similarities = (query @ X.T).toarray().ravel()
t_sparse = time.perf_counter() - t0

# Using scipy.sparse.linalg for the same
from sklearn.metrics.pairwise import cosine_similarity
t0 = time.perf_counter()
sim_sklearn = cosine_similarity(query, X).ravel()
t_sklearn = time.perf_counter() - t0

print(f"Sparse dot: {t_sparse*1000:.1f} ms")
print(f"sklearn cosine: {t_sklearn*1000:.1f} ms")
print(f"Top 3 similar docs: {np.argsort(similarities)[-3:]}")

Recommender Systems: User-Item Matrix

import scipy.sparse as sp
import numpy as np

def build_user_item_matrix(
user_ids: np.ndarray,
item_ids: np.ndarray,
ratings: np.ndarray,
n_users: int,
n_items: int,
) -> sp.csr_matrix:
"""
Build a sparse user-item interaction matrix.
Duplicate (user, item) pairs are summed.
"""
return sp.csr_matrix(
(ratings, (user_ids, item_ids)),
shape=(n_users, n_items),
dtype=np.float32,
)

# Simulate 10M interactions, 500K users, 100K items (0.02% density)
np.random.seed(42)
n_interactions = 10_000_000
n_users, n_items = 500_000, 100_000

user_ids = np.random.randint(0, n_users, n_interactions, dtype=np.int32)
item_ids = np.random.randint(0, n_items, n_interactions, dtype=np.int32)
ratings = np.ones(n_interactions, dtype=np.float32) # implicit feedback

R = build_user_item_matrix(user_ids, item_ids, ratings, n_users, n_items)
print(f"User-item matrix: {R.shape}, {R.nnz:,} non-zeros")
print(f"Density: {R.nnz / (n_users * n_items):.4%}")
print(f"Memory (sparse): {(R.data.nbytes + R.indices.nbytes + R.indptr.nbytes) / 1024**2:.0f} MB")
print(f"Memory (dense would be): {n_users * n_items * 4 / 1024**3:.1f} GB")
# Memory (sparse): ~114 MB
# Memory (dense would be): 186.3 GB - impossible

# User-based collaborative filtering: find users with similar interaction patterns
# Normalise rows for cosine similarity
from sklearn.preprocessing import normalize
R_norm = normalize(R, norm="l2") # Still sparse

# Similarity between user 0 and all others (sparse dot product)
user_sims = (R_norm[0] @ R_norm.T).toarray().ravel()
top_similar = np.argsort(user_sims)[-6:-1][::-1] # Top 5 (exclude self)
print(f"Top 5 similar users to user 0: {top_similar}")

Part 3 - scipy.stats: Statistical Testing for ML

Statistical tests are not just academic exercises. In ML engineering, you use them to:

  • Decide whether a feature is worth including (correlation, mutual information)
  • Validate A/B test results before deploying a new model
  • Detect data drift between training and serving distributions
  • Determine sample size requirements before running experiments

Hypothesis Testing for A/B Decisions

from scipy import stats
import numpy as np

# ------------------------------------------------------------------ #
# A/B test: did the new recommendation model improve click-through rate?
# ------------------------------------------------------------------ #
np.random.seed(42)

# Control (old model): 10K users, 12% CTR
n_control = 10_000
ctr_control = 0.12
clicks_control = np.random.binomial(1, ctr_control, n_control)

# Treatment (new model): 10K users, 13% CTR (1 percentage point improvement)
n_treatment = 10_000
ctr_treatment = 0.13
clicks_treatment = np.random.binomial(1, ctr_treatment, n_treatment)

print(f"Control CTR: {clicks_control.mean():.4f}")
print(f"Treatment CTR: {clicks_treatment.mean():.4f}")

# Two-sample t-test (valid for proportions with large n)
t_stat, p_value = stats.ttest_ind(clicks_treatment, clicks_control)
print(f"\nTwo-sample t-test:")
print(f"t-statistic: {t_stat:.4f}")
print(f"p-value: {p_value:.6f}")
print(f"Significant at alpha=0.05: {p_value < 0.05}")

# For proportions, the chi-squared test is more appropriate
from scipy.stats import chi2_contingency

# Contingency table: [[treatment clicks, treatment non-clicks], [control clicks, control non-clicks]]
contingency = np.array([
[clicks_treatment.sum(), n_treatment - clicks_treatment.sum()],
[clicks_control.sum(), n_control - clicks_control.sum()],
])
chi2, p_chi2, dof, expected = chi2_contingency(contingency)
print(f"\nChi-squared test:")
print(f"Chi2: {chi2:.4f}, p-value: {p_chi2:.6f}")

# Effect size: Cohen's h for proportions
p1 = clicks_treatment.mean()
p2 = clicks_control.mean()
cohens_h = 2 * np.arcsin(np.sqrt(p1)) - 2 * np.arcsin(np.sqrt(p2))
print(f"\nEffect size (Cohen's h): {abs(cohens_h):.4f}")
# < 0.2 = small, 0.2-0.5 = medium, > 0.5 = large

# Confidence interval for the difference in proportions
from statsmodels.stats.proportion import proportion_confint
ci_treatment = proportion_confint(clicks_treatment.sum(), n_treatment, alpha=0.05, method="wilson")
ci_control = proportion_confint(clicks_control.sum(), n_control, alpha=0.05, method="wilson")
print(f"\n95% CI - Treatment: [{ci_treatment[0]:.4f}, {ci_treatment[1]:.4f}]")
print(f"95% CI - Control: [{ci_control[0]:.4f}, {ci_control[1]:.4f}]")

Detecting Data Drift with the KS Test

from scipy import stats
import numpy as np

# ------------------------------------------------------------------ #
# Data drift detection: compare training distribution to production distribution
# ------------------------------------------------------------------ #

np.random.seed(42)

# Training distribution (collected in Jan 2024)
train_session_duration = np.random.exponential(scale=300, size=50_000) # mean 300s

# Production distribution after product change (Feb 2024)
# Users are spending less time - mean shifted to 240s
prod_session_duration = np.random.exponential(scale=240, size=10_000)

# Kolmogorov-Smirnov two-sample test
# H0: both samples are drawn from the same distribution
ks_stat, ks_p = stats.ks_2samp(train_session_duration, prod_session_duration)
print(f"KS statistic: {ks_stat:.4f}")
print(f"KS p-value: {ks_p:.2e}")
print(f"Drift detected: {ks_p < 0.01}") # p << 0.01 → reject H0 → drift confirmed

# Jensen-Shannon divergence for continuous monitoring (no p-value, but bounded [0, 1])
def js_divergence(p_samples: np.ndarray, q_samples: np.ndarray, n_bins: int = 50) -> float:
"""Jensen-Shannon divergence between two empirical distributions."""
# Compute histograms over combined range
all_min = min(p_samples.min(), q_samples.min())
all_max = max(p_samples.max(), q_samples.max())
bins = np.linspace(all_min, all_max, n_bins + 1)

p_hist, _ = np.histogram(p_samples, bins=bins, density=True)
q_hist, _ = np.histogram(q_samples, bins=bins, density=True)

# Normalise to probability distributions
p_hist = p_hist / p_hist.sum() + 1e-10
q_hist = q_hist / q_hist.sum() + 1e-10

m = 0.5 * (p_hist + q_hist)
kl_pm = np.sum(p_hist * np.log(p_hist / m))
kl_qm = np.sum(q_hist * np.log(q_hist / m))
return float(0.5 * (kl_pm + kl_qm))

jsd = js_divergence(train_session_duration, prod_session_duration)
print(f"\nJS Divergence: {jsd:.4f}")
# 0 = identical, 1 = completely different
# Threshold: flag if JSD > 0.05 in practice

# Monitor multiple features at once
features = {
"session_duration": (train_session_duration, prod_session_duration),
}
for feat, (train_vals, prod_vals) in features.items():
ks, p = stats.ks_2samp(train_vals, prod_vals)
print(f"{feat}: KS={ks:.3f}, p={p:.2e}, {'DRIFT' if p < 0.01 else 'OK'}")

Distribution Fitting

from scipy import stats
import numpy as np

# ------------------------------------------------------------------ #
# Fit a distribution to residuals after regression
# (useful for calibrating confidence intervals and anomaly detection)
# ------------------------------------------------------------------ #
np.random.seed(42)

# Simulate regression residuals (true: normal, but with heavy tails)
residuals = stats.t.rvs(df=5, scale=2, size=10_000) # t-distribution, 5 dof

# Test normality: Shapiro-Wilk (for n < 5000) and Kolmogorov-Smirnov
_, p_shapiro = stats.shapiro(residuals[:5000])
print(f"Shapiro-Wilk p-value: {p_shapiro:.4f}")
# p << 0.05 → reject normality

# Fit normal and t distributions, compare AIC
from scipy.optimize import minimize_scalar

# Normal fit
mu_n, sigma_n = stats.norm.fit(residuals)
log_lik_norm = np.sum(stats.norm.logpdf(residuals, mu_n, sigma_n))
aic_norm = -2 * log_lik_norm + 2 * 2 # 2 parameters

# t fit
df_t, loc_t, scale_t = stats.t.fit(residuals)
log_lik_t = np.sum(stats.t.logpdf(residuals, df_t, loc_t, scale_t))
aic_t = -2 * log_lik_t + 2 * 3 # 3 parameters

print(f"Normal AIC: {aic_norm:.1f}")
print(f"t AIC: {aic_t:.1f}")
print(f"t-distribution is better fit: {aic_t < aic_norm}")

# Use the fitted t-distribution for anomaly detection
# Flag residuals in the extreme 1% tails
lower = stats.t.ppf(0.005, df_t, loc_t, scale_t)
upper = stats.t.ppf(0.995, df_t, loc_t, scale_t)
anomalies = (residuals < lower) | (residuals > upper)
print(f"Anomaly rate (99% CI): {anomalies.mean():.2%}")

Statistical Power Analysis

Before running an A/B test, compute the required sample size. Running underpowered experiments is a common ML engineering mistake that wastes time and leads to wrong decisions.

from scipy import stats
import numpy as np

def minimum_sample_size(
baseline_rate: float,
min_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.80,
) -> int:
"""
Compute minimum sample size per group for a two-proportion z-test.

Args:
baseline_rate: current conversion rate (e.g., 0.05 for 5%)
min_detectable_effect: minimum relative improvement to detect (e.g., 0.10 for 10%)
alpha: significance level (Type I error rate)
power: desired statistical power (1 - Type II error rate)
"""
p1 = baseline_rate
p2 = baseline_rate * (1 + min_detectable_effect)

# z-scores for alpha/2 and beta
z_alpha = stats.norm.ppf(1 - alpha / 2)
z_beta = stats.norm.ppf(power)

# Pooled proportion under H0
p_pooled = (p1 + p2) / 2

# Standard error under H0 and H1
se_h0 = np.sqrt(2 * p_pooled * (1 - p_pooled))
se_h1 = np.sqrt(p1 * (1 - p1) + p2 * (1 - p2))

n = ((z_alpha * se_h0 + z_beta * se_h1) / (p2 - p1)) ** 2
return int(np.ceil(n))

# Example: current model has 5% CTR. We want to detect a 10% relative improvement (to 5.5%)
# with 80% power at alpha=0.05
n = minimum_sample_size(
baseline_rate=0.05,
min_detectable_effect=0.10,
alpha=0.05,
power=0.80,
)
print(f"Required sample size per group: {n:,}")
# Required sample size per group: ~30,000
# Total experiment size: ~60,000 users

# Sensitivity analysis: how does sample size change with effect size?
effects = [0.05, 0.10, 0.15, 0.20, 0.30]
for effect in effects:
n = minimum_sample_size(0.05, effect)
print(f"Detect {effect:.0%} improvement: {n:,} per group")

Part 4 - scipy.spatial.distance: Pairwise Distance Computation

Computing distances between vectors is at the core of KNN, clustering, embedding search, and anomaly detection. scipy.spatial.distance provides vectorised implementations that outperform naive NumPy.

from scipy.spatial.distance import cdist, pdist, squareform
import numpy as np
import time

# ------------------------------------------------------------------ #
# pdist: pairwise distances within a single set of vectors
# cdist: distances between two sets of vectors
# ------------------------------------------------------------------ #

n = 2_000
d = 128 # embedding dimension (e.g., sentence encoder output)

X = np.random.randn(n, d).astype(np.float32)
Y = np.random.randn(500, d).astype(np.float32)

# --- pdist: all pairwise distances within X ---
# Returns condensed form (upper triangle), length = n*(n-1)/2
t0 = time.perf_counter()
dists_condensed = pdist(X, metric="euclidean")
t_pdist = time.perf_counter() - t0

# Full symmetric matrix from condensed form
dist_matrix = squareform(dists_condensed) # shape (2000, 2000)

print(f"pdist (2000 x 128): {t_pdist:.3f}s")
print(f"Condensed shape: {dists_condensed.shape}") # (1999000,)
print(f"Full matrix shape: {dist_matrix.shape}") # (2000, 2000)

# --- cdist: distances between X and Y ---
t0 = time.perf_counter()
cross_dists = cdist(X, Y, metric="euclidean") # shape (2000, 500)
t_cdist = time.perf_counter() - t0

print(f"cdist (2000 x 500, 128 dims): {t_cdist:.3f}s")
print(f"Cross distance matrix: {cross_dists.shape}")

# Nearest neighbour in Y for each point in X
nn_idx = cross_dists.argmin(axis=1) # shape (2000,) - index into Y rows
nn_dist = cross_dists.min(axis=1) # shape (2000,)

Available Distance Metrics

from scipy.spatial.distance import cdist
import numpy as np

X = np.random.randn(100, 50)
Y = np.random.randn(80, 50)

metrics = ["euclidean", "cosine", "manhattan", "chebyshev", "correlation"]

for metric in metrics:
D = cdist(X, Y, metric=metric)
print(f"{metric:15}: shape={D.shape}, range=[{D.min():.2f}, {D.max():.2f}]")
MetricWhen to Use
euclideanDense embeddings, KNN with raw features
cosineText embeddings, normalised vectors
manhattanRobust to outliers; sparse features
chebyshevWhen the worst-case dimension matters
correlationTime series similarity (mean-removed cosine)

K-Nearest Neighbours from Scratch

from scipy.spatial.distance import cdist
import numpy as np

class VectorizedKNN:
"""KNN classifier using scipy.spatial.distance for pairwise computation."""

def __init__(self, k: int = 5, metric: str = "euclidean"):
self.k = k
self.metric = metric
self.X_train: np.ndarray | None = None
self.y_train: np.ndarray | None = None

def fit(self, X: np.ndarray, y: np.ndarray) -> "VectorizedKNN":
self.X_train = X.astype(np.float32)
self.y_train = y
return self

def predict(self, X: np.ndarray) -> np.ndarray:
# Compute distances: (n_test, n_train)
# For large datasets, process in chunks to avoid memory OOM
chunk_size = 1_000
n_test = len(X)
predictions = np.empty(n_test, dtype=self.y_train.dtype)

for start in range(0, n_test, chunk_size):
end = min(start + chunk_size, n_test)
chunk = X[start:end].astype(np.float32)

# D: (chunk_size, n_train)
D = cdist(chunk, self.X_train, metric=self.metric)

# k nearest neighbour indices for each test point
knn_idx = np.argpartition(D, self.k, axis=1)[:, :self.k]

# Majority vote
knn_labels = self.y_train[knn_idx] # (chunk_size, k)
for i, labels in enumerate(knn_labels):
unique, counts = np.unique(labels, return_counts=True)
predictions[start + i] = unique[counts.argmax()]

return predictions


# Test
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

X, y = make_classification(n_samples=5_000, n_features=20, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

knn = VectorizedKNN(k=7).fit(X_train, y_train)
y_pred = knn.predict(X_test)
print(f"KNN accuracy: {accuracy_score(y_test, y_pred):.4f}")

Part 5 - Signal Processing for Time Series ML

scipy.signal provides tools for filtering noise from sensor data, computing frequency domain features, and detecting events in time series - all common preprocessing steps before feeding time series data to ML models.

from scipy import signal
import numpy as np

# ------------------------------------------------------------------ #
# Use case: ECG signal denoising before arrhythmia classification
# ------------------------------------------------------------------ #

# Simulate a noisy time series (heart rate signal at 250 Hz)
fs = 250 # sampling frequency in Hz
duration = 10 # seconds
t = np.linspace(0, duration, fs * duration)

# Clean signal: 1 Hz heartbeat + 3 Hz harmonic
clean = np.sin(2 * np.pi * 1.0 * t) + 0.3 * np.sin(2 * np.pi * 3.0 * t)

# Add 50 Hz power line noise and high-frequency noise
noise = (
0.5 * np.sin(2 * np.pi * 50 * t) + # 50 Hz power line interference
0.2 * np.random.randn(len(t)) # broadband noise
)
noisy = clean + noise

# --- Butterworth bandpass filter (1-15 Hz): removes noise outside band ---
low_hz = 1.0 # lower cutoff
high_hz = 15.0 # upper cutoff

# Design filter: 5th-order Butterworth
sos = signal.butter(
N=5,
Wn=[low_hz, high_hz],
btype="bandpass",
fs=fs,
output="sos", # Second-order sections - numerically stable
)

# Apply with zero phase distortion (forward-backward filtering)
filtered = signal.sosfiltfilt(sos, noisy)

# Compute SNR before and after filtering
snr_before = 10 * np.log10(clean.var() / noise.var())
snr_after = 10 * np.log10(clean.var() / (filtered - clean).var())
print(f"SNR before filtering: {snr_before:.1f} dB")
print(f"SNR after filtering: {snr_after:.1f} dB")

Frequency Domain Features

from scipy import signal, fft
import numpy as np

def extract_spectral_features(
time_series: np.ndarray,
fs: float,
bands: dict[str, tuple[float, float]],
) -> dict[str, float]:
"""
Compute spectral power in specified frequency bands.
Used as features for time series classification.

Args:
time_series: 1D signal array
fs: sampling frequency in Hz
bands: dict mapping band name to (low_hz, high_hz)
"""
# Compute power spectral density via Welch's method
# Welch averages multiple periodograms → less variance than raw FFT
freqs, psd = signal.welch(
time_series,
fs=fs,
window="hann",
nperseg=256, # FFT window length
noverlap=128, # 50% overlap
)

total_power = np.trapz(psd, freqs) # numerical integration

features = {}
for band_name, (low, high) in bands.items():
mask = (freqs >= low) & (freqs <= high)
band_power = np.trapz(psd[mask], freqs[mask])
features[f"power_{band_name}"] = float(band_power)
features[f"rel_power_{band_name}"] = float(band_power / (total_power + 1e-10))

# Spectral entropy
psd_norm = psd / (psd.sum() + 1e-10)
psd_norm = psd_norm[psd_norm > 0]
features["spectral_entropy"] = float(-np.sum(psd_norm * np.log2(psd_norm)))

# Dominant frequency
features["dominant_freq"] = float(freqs[np.argmax(psd)])

return features


# EEG-like application: classify brain states by frequency band power
eeg_bands = {
"delta": (0.5, 4.0), # deep sleep
"theta": (4.0, 8.0), # drowsy
"alpha": (8.0, 13.0), # relaxed
"beta": (13.0, 30.0), # alert
"gamma": (30.0, 100.0), # active processing
}

fs = 250 # Hz
n_samples = fs * 10 # 10 seconds of EEG

# Simulate signal
t = np.linspace(0, 10, n_samples)
signal_eeg = (
2.0 * np.sin(2 * np.pi * 2 * t) + # delta
1.0 * np.sin(2 * np.pi * 6 * t) + # theta
0.5 * np.sin(2 * np.pi * 10 * t) + # alpha
0.3 * np.random.randn(n_samples)
)

features = extract_spectral_features(signal_eeg, fs, eeg_bands)
for k, v in features.items():
print(f" {k}: {v:.4f}")

Peak Detection

from scipy import signal
import numpy as np

# ------------------------------------------------------------------ #
# Use case: count heartbeats in an ECG signal
# ------------------------------------------------------------------ #

# Simulate R-peak detection in a synthetic ECG
fs = 360
duration = 10
t = np.linspace(0, duration, fs * duration)

# Generate synthetic ECG-like signal
heartbeat_rate = 1.2 # Hz (72 bpm)
ecg = np.zeros(len(t))
beat_times = np.arange(0, duration, 1.0 / heartbeat_rate)
for bt in beat_times:
idx = int(bt * fs)
if idx < len(ecg):
ecg[idx:idx+5] = [0.2, 0.5, 1.0, 0.5, 0.2] # simplified QRS complex

ecg += 0.1 * np.random.randn(len(ecg)) # add noise

# Find R-peaks (prominent peaks)
peaks, properties = signal.find_peaks(
ecg,
height=0.5, # minimum peak height
distance=int(0.5*fs), # minimum 0.5s between peaks (120 bpm max)
prominence=0.3, # minimum prominence
)

rr_intervals = np.diff(peaks) / fs # R-R intervals in seconds
heart_rate = 60 / rr_intervals # instantaneous heart rate in bpm

print(f"Detected {len(peaks)} R-peaks")
print(f"Mean heart rate: {heart_rate.mean():.1f} bpm")
print(f"HRV (std of RR intervals): {rr_intervals.std()*1000:.1f} ms")

Part 6 - scipy.linalg vs numpy.linalg

scipy.linalg provides a superset of numpy.linalg with more complete LAPACK bindings. For most operations, both give the same result. The differences matter when you need:

from scipy import linalg
import numpy as np
import time

# ------------------------------------------------------------------ #
# scipy.linalg.solve is faster and more numerically stable for
# solving linear systems Ax = b than np.linalg.inv(A) @ b
# ------------------------------------------------------------------ #
n = 2_000
A = np.random.randn(n, n).astype(np.float64)
b = np.random.randn(n).astype(np.float64)

# BAD: compute inverse then multiply - O(n^3) + O(n^2), loses precision
t0 = time.perf_counter()
x_bad = np.linalg.inv(A) @ b
t_inv = time.perf_counter() - t0

# GOOD: direct solver - O(n^3) but one fewer pass, better numerical stability
t0 = time.perf_counter()
x_good = linalg.solve(A, b)
t_solve = time.perf_counter() - t0

print(f"Via inv(): {t_inv:.3f}s, residual: {np.linalg.norm(A @ x_bad - b):.2e}")
print(f"Via solve(): {t_solve:.3f}s, residual: {np.linalg.norm(A @ x_good - b):.2e}")
# solve() is faster and more accurate

# ------------------------------------------------------------------ #
# scipy.linalg has LU, Cholesky, QR, Schur, Polar decompositions
# not all available in numpy.linalg
# ------------------------------------------------------------------ #

# Cholesky decomposition (for SPD matrices - common in Gaussian processes)
A_spd = A.T @ A + n * np.eye(n) # Make symmetric positive definite
L = linalg.cholesky(A_spd, lower=True)
# Verify: A_spd ≈ L @ L.T
residual = np.linalg.norm(A_spd - L @ L.T)
print(f"Cholesky residual: {residual:.2e}")

# Solve a system with an SPD matrix using Cholesky (2x faster than general solve)
t0 = time.perf_counter()
x_cho = linalg.cho_solve(linalg.cho_factor(A_spd), b)
t_cho = time.perf_counter() - t0
print(f"Cholesky solve: {t_cho:.3f}s, residual: {np.linalg.norm(A_spd @ x_cho - b):.2e}")

Interview Patterns

Pattern 1: Know when gradient-free optimisation is appropriate. When the objective function is non-differentiable (F1, AUROC, business metrics), non-smooth (tree-based model output), or the gradient is unavailable (black-box API), use scipy.optimize. For smooth differentiable objectives with available gradients, PyTorch's Adam is faster.

Pattern 2: Sparse is not always faster. Sparse matrix operations have higher per-element overhead than dense. The break-even point is roughly 10-20% density. Below 10% density, sparse is almost always better. Between 10-30%, benchmark both.

Pattern 3: p-value interpretation in high-dimensional ML. With millions of observations, almost any feature will be statistically significant (p < 0.05). Always report effect size alongside p-values. A statistically significant feature with Cohen's d of 0.01 is practically useless for a model.

Pattern 4: The KS test for monitoring. scipy.stats.ks_2samp is the standard test for detecting distribution shift between two samples. Set a threshold of p < 0.01 (conservative) for production monitoring - this controls the false positive rate at 1%. Complement with Jensen-Shannon divergence for a bounded (0-1) drift score that is easier to threshold.

Pattern 5: cdist for embedding similarity. For batch embedding search (top-k nearest neighbours across a corpus), scipy.spatial.distance.cdist is the right tool for corpora up to ~100K embeddings. Above that, use FAISS or similar approximate nearest neighbour libraries.

Key Takeaways

  • scipy.optimize.minimize handles any smooth objective. differential_evolution handles non-convex objectives with multiple local minima. curve_fit fits parametric models to empirical data - use it for learning curve prediction and model calibration.
  • Sparse matrices are essential for NLP (TF-IDF) and recommender systems (user-item matrices). Use CSR for row-wise operations and matrix-vector products, CSC for column-wise operations. The memory savings are often 100x-10000x over dense.
  • Statistical tests are decision tools: KS test for distribution drift detection, Mann-Whitney U for comparing non-normal distributions, chi-squared for proportions in A/B tests. Always compute effect size alongside p-values.
  • Always compute required sample size before running an A/B test. An underpowered test wastes engineering time and produces unreliable conclusions.
  • scipy.spatial.distance.cdist is the correct tool for pairwise distance computation between two sets of vectors. It dispatches to optimised C code for all standard metrics.
  • scipy.signal provides filtering and spectral analysis that turns raw sensor data into ML-ready features. Butterworth filters are a good default; always use sosfiltfilt for zero-phase filtering.
  • Use scipy.linalg.solve instead of np.linalg.inv(A) @ b. The direct solver is both faster and more numerically stable.

Practice Problems

Level 1 - Predict the Output

Problem 1: A model's validation AUROC improves from 0.823 to 0.831. With n=50,000 samples, the improvement is statistically significant (p < 0.001). Should you deploy the new model? What is missing from this analysis?

Answer

Statistical significance does not imply practical significance. An improvement of 0.008 in AUROC may or may not be meaningful depending on context:

  1. Effect size: the absolute difference is 0.008. For high-stakes decisions (medical, legal), even 0.001 matters. For CTR prediction, 0.008 may be negligible.
  2. Confidence interval on the difference: compute stats.ttest_rel or bootstrap confidence intervals on the AUROC difference across cross-validation folds.
  3. Business metric impact: map the AUROC improvement to revenue, error rate, or whatever the business cares about. AUROC is a proxy; the actual metric is what matters.
  4. Variance across folds: if the AUROC standard deviation across 5 folds is 0.010, an improvement of 0.008 is within the noise.

The answer is: you need effect size, confidence intervals, and a business metric mapping before deciding to deploy.

Problem 2: When would you use cdist vs pdist?

Answer
  • pdist(X): computes pairwise distances within a single matrix X. Returns condensed upper-triangle form. Use for clustering (DBSCAN, hierarchical), computing a kernel matrix over a single dataset.
  • cdist(X, Y): computes distances between two matrices. Use for KNN (queries X vs corpus Y), computing similarities between a query set and a reference set.

pdist is ~2x faster than cdist(X, X) for the same matrix because it only computes the upper triangle (exploiting symmetry).

Level 2 - Debug Challenge

This model calibration is producing badly calibrated output probabilities. Find the bug.

from scipy.optimize import minimize
import numpy as np

np.random.seed(42)
raw_scores = np.random.randn(10_000)
true_labels = (raw_scores + np.random.randn(10_000) * 0.5 > 0).astype(float)

def log_loss(params, scores, labels):
a, b = params
probs = 1.0 / (1.0 + np.exp(-(a * scores + b)))
return -np.mean(labels * np.log(probs) + (1 - labels) * np.log(1 - probs))

result = minimize(log_loss, x0=[1.0, 0.0], args=(raw_scores, true_labels), method="L-BFGS-B")
a, b = result.x

# Something is wrong - calibrated probs are outside [0, 1]
calibrated = a * raw_scores + b
print(calibrated.min(), calibrated.max())
Answer

The bug is on the last line: a * raw_scores + b computes the linear combination (logit), not the calibrated probability. The probability is the sigmoid of the logit.

# Fix:
calibrated_probs = 1.0 / (1.0 + np.exp(-(a * raw_scores + b)))
print(calibrated_probs.min(), calibrated_probs.max()) # [0, 1] range ✓

Also note: the log_loss function does not clip probs, so if a * scores + b is very large, np.log(1 - probs) returns -inf. Add np.clip(probs, 1e-9, 1 - 1e-9) to the function for numerical safety.

Level 3 - Design Challenge

You are building a data drift monitoring system for a production ML model. The model receives 50,000 requests per day. Each request has 30 numeric features. You want to detect when any feature's distribution has drifted significantly from the training distribution.

Design a system that:

  1. Computes daily drift scores for all 30 features
  2. Raises an alert when drift is detected at a statistically meaningful threshold
  3. Does not fire false alarms more than 5% of the time on a stable system (the multiple testing problem)
  4. Produces a single health score for the overall feature set
Solution Sketch
from scipy import stats
import numpy as np
from dataclasses import dataclass, field

@dataclass
class DriftMonitor:
"""
Production drift monitor using KS tests with Bonferroni correction.
"""
feature_names: list[str]
alpha: float = 0.05 # family-wise error rate
n_features: int = field(init=False)

def __post_init__(self):
self.n_features = len(self.feature_names)
# Bonferroni-corrected per-feature alpha
self.alpha_per_feature = self.alpha / self.n_features

def fit(self, X_train: np.ndarray) -> "DriftMonitor":
"""Store reference distribution from training data."""
assert X_train.shape[1] == self.n_features
self.reference = X_train
return self

def check(self, X_prod: np.ndarray) -> dict:
"""
Compare production sample against training distribution.
Returns per-feature drift scores and an overall health score.
"""
results = {}
ks_stats = np.zeros(self.n_features)
p_values = np.zeros(self.n_features)

for i, feat in enumerate(self.feature_names):
ks, p = stats.ks_2samp(
self.reference[:, i],
X_prod[:, i],
alternative="two-sided",
)
ks_stats[i] = ks
p_values[i] = p

# Bonferroni correction: flag drift if p < alpha/n_features
# Controls family-wise error rate at alpha even with 30 simultaneous tests
drift_flags = p_values < self.alpha_per_feature

# Jensen-Shannon divergence for a bounded health score
js_divs = np.array([
self._js_divergence(self.reference[:, i], X_prod[:, i])
for i in range(self.n_features)
])

# Overall health: mean JSD across features (0=healthy, 1=completely drifted)
health_score = 1.0 - js_divs.mean()

return {
"feature_drift": dict(zip(self.feature_names, drift_flags.tolist())),
"ks_statistics": dict(zip(self.feature_names, ks_stats.tolist())),
"p_values": dict(zip(self.feature_names, p_values.tolist())),
"js_divergences": dict(zip(self.feature_names, js_divs.tolist())),
"n_drifted": int(drift_flags.sum()),
"health_score": float(health_score),
"alert": bool(drift_flags.any()),
}

def _js_divergence(self, p: np.ndarray, q: np.ndarray, n_bins: int = 50) -> float:
lo = min(p.min(), q.min())
hi = max(p.max(), q.max())
bins = np.linspace(lo, hi, n_bins + 1)
ph, _ = np.histogram(p, bins=bins, density=True)
qh, _ = np.histogram(q, bins=bins, density=True)
ph = ph / (ph.sum() + 1e-10) + 1e-10
qh = qh / (qh.sum() + 1e-10) + 1e-10
m = 0.5 * (ph + qh)
return float(0.5 * np.sum(ph * np.log(ph/m)) + 0.5 * np.sum(qh * np.log(qh/m)))


# Usage
n_train = 100_000
n_features = 30
feature_names = [f"feature_{i}" for i in range(n_features)]

X_train = np.random.randn(n_train, n_features)
monitor = DriftMonitor(feature_names).fit(X_train)

# Stable day: no drift
X_stable = np.random.randn(50_000, n_features)
report_stable = monitor.check(X_stable)
print(f"Stable: {report_stable['n_drifted']} features drifted, health={report_stable['health_score']:.3f}")

# Drifted day: 3 features shift
X_drifted = np.random.randn(50_000, n_features)
X_drifted[:, :3] += 1.0 # shift first 3 features
report_drifted = monitor.check(X_drifted)
print(f"Drifted: {report_drifted['n_drifted']} features drifted, health={report_drifted['health_score']:.3f}")

Key design decisions: (1) Bonferroni correction controls the family-wise false alarm rate when running 30 simultaneous tests; (2) JSD provides a continuous, bounded health score that is easier to threshold than p-values; (3) store the full training distribution, not just statistics, to enable KS tests (which need the full empirical CDF).

© 2026 EngineersOfAI. All rights reserved.